package com.example.service;

import com.example.bean.Position;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;
import com.example.util.DBHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import java.util.stream.Collectors;

@Service("positionService")
public class PositionService {

    @Autowired
    ElasticsearchRestTemplate elasticsearchTemplate;
    @Autowired
    RestHighLevelClient client;

    private static final String POSITIOIN_INDEX = "position";


    public List<Map> queryPositions(String query)  {

        List<Map> positionList = new ArrayList<>();

        IndexCoordinates indexCoordinates =  IndexCoordinates.of("position");

        NativeSearchQuery nativeSearchQuery = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.matchQuery("positionName",query))
                .withPageable(PageRequest.of(0, 5))
                .build();

        SearchHits<Map> search = elasticsearchTemplate.search(nativeSearchQuery, Map.class,indexCoordinates);

        List<SearchHit<Map>> searchHits = search.getSearchHits();

        if (searchHits.size() > 0 ){
            List<Map> searchPositionList = searchHits.stream().map(searchHit -> searchHit.getContent()).collect(Collectors.toList());
            positionList.addAll(searchPositionList);
        }

        //美女多、员工福利好
        if(positionList.size()<5){
            NativeSearchQuery nativeSearchQuery1 = new NativeSearchQueryBuilder()
                    .withQuery(QueryBuilders.matchQuery("positionAdvantage","美女多、员工福利好"))
                    .withPageable(PageRequest.of(0, 5-positionList.size()))
                    .build();
            SearchHits<Map> search1 = elasticsearchTemplate.search(nativeSearchQuery1, Map.class,indexCoordinates);
            List<SearchHit<Map>> searchHits1 = search1.getSearchHits();
            if (searchHits1.size() > 0){
                List<Map>  otherPositionList = searchHits1.stream().map(searchHit -> searchHit.getContent()).collect(Collectors.toList());
                positionList.addAll(otherPositionList);
            }
        }

        return  positionList;
    }


    public void importAll() throws IOException {
        writeMysqlDataToES(POSITIOIN_INDEX);
    }
    /** 讲数据批量写入ES中 */
    private void writeMysqlDataToES(String tableName) {
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = DBHelper.getConn();
            System.out.println("Start handle data :" + tableName);
            String sql = "SELECT * from " + tableName;
            ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
                    ResultSet.CONCUR_READ_ONLY);
            // 根据自己需要 设置
            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetMetaData colData = rs.getMetaData();
            ArrayList<HashMap<String, String>> dataList = new
                    ArrayList<HashMap<String, String>>();
            // bulkProcessor 添加的数据支持的方式并不多，查看其api发现其支持map键值对的方式，故笔者在此将查出来的数据转换成hashMap方式
            HashMap<String, String> map = null;
            int count = 0;
            String c = null;
            String v = null;
            while (rs.next()) {
                count++;
                map = new HashMap<String, String>(128);
                for (int i = 1; i <= colData.getColumnCount(); i++) {
                    c = colData.getColumnName(i);
                    v = rs.getString(c);
                    map.put(c, v);
                }
                dataList.add(map);
                // 每1万条写一次，不足的批次的最后再一并提交
                if (count % 10000 == 0) {
                    System.out.println("Mysql handle data number : " + count);
                    // 将数据添加到 bulkProcessor 中
                    for (HashMap<String, String> hashMap2 : dataList) {
                        bulkProcessor.add(
                                new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                    }
                    // 每提交一次便将map与list清空
                    map.clear();
                    dataList.clear();
                }
            }
            // 处理未提交的数据
            for (HashMap<String, String> hashMap2 : dataList) {
                bulkProcessor.add(
                        new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                System.out.println(hashMap2);
            }
            System.out.println("-------------------------- Finally insert number total: " + count);
            // 将数据刷新到es, 注意这一步执行后并不会立即生效，取决于bulkProcessor设置的刷新时间
            bulkProcessor.flush();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            try {
                rs.close();
                ps.close();
                conn.close();
                boolean terminatedFlag = bulkProcessor.awaitClose(150L,
                        TimeUnit.SECONDS);
                System.out.println(terminatedFlag);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }

    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
        BulkProcessor bulkProcessor = null;
        try {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    System.out.println("Try to insert data number : " + request.numberOfActions());
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId);
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      Throwable failure) {
                    System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId);
                }
            };
            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> client
                            .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
                    listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            // 注意点：让参数设置生效
            bulkProcessor = builder.build();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                System.out.println(e1.getMessage());
            }
        }
        return bulkProcessor;
    }
}



